package com.hanxiaozhang.advanced.no3currentlimiting;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * 〈一句话功能简述〉<br>
 * 〈〉
 *
 * @author hanxinghua
 * @create 2022/9/24
 * @since 1.0.0
 */
public class Consumer2 {

    private static final String QUEUE_NAME = "limit-01";

    public static void main(String[] args) throws Exception {


        // 创建链接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置RabbitMQ所在主机ip或者主机名
        factory.setHost("localhost");
        // 创建一个连接
        Connection connection = factory.newConnection();
        // 创建一个频道
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println("Waiting for messages.");

        // 设置限流机制
        // 第一个参数：消息本身的大小，如果设置为0，表示对消息本身的大小不限制
        // 第二个参数：告诉RabbitMQ不要一次性给消费者推送大于N个消息，前提是现在这N个消息，已经手动被确认(已经完成)
        // 第三个参数：是否将上面的设置应用于整个通道  true：表示整个通道的消费者 false： 只有当前的Consumer
        channel.basicQos(0, 5, false);
        // 结论：实际上如果不设置的话，分配任务的是一开始就分配好了，必须手动签收

        // 声明消费者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

                System.out.println("消费者2接受到的消息是:" + new String(body));
                try {
                    Thread.sleep(200);
                } catch (Exception err) {

                }
                // 进行手动应答
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        // 启动消费者
        channel.basicConsume(QUEUE_NAME, false, defaultConsumer);

    }
}
